Punto de entrada

Vamos a crear un punto de entrada al API de dataframes y dataset.


In [2]:
from pyspark.sql import SparkSession

In [2]:
spark= SparkSession.builder.appName("Trabajando con Spark SQL").getOrCreate()

Lo primero que vamos a leer va a ser un fichero json que representa la tabla periodica y lo vamos a almacenar en un dataframe sobre el que vamos a ir realizando diferentes acciones como si se tratara de un RDD.

Nota: el formato json en spark SQL es un formato por línea , como si fuera un CSV, por lo tanto, hay que transformar el listado de objetos en un una fila por cada objeto.


In [3]:
import json
with open('sql/PeriodicTableJSON.json') as data_file:    
    data = json.load(data_file)
    

    with open('sql/PeriodicTableJSON.jsonl', 'w') as outfile:
        for entry in data:
            json.dump(entry, outfile)
            outfile.write('\n')

In [4]:
df = spark.read.json("sql/PeriodicTableJSON.jsonl")

In [5]:
df.show()


+--------------------+-------------+-------+--------------------+-----+-------+--------------------+-------+----------+----------+--------------------+------+------+-----+--------------------+--------------------+--------------------+------+----+----+
|          appearance|  atomic_mass|   boil|            category|color|density|       discovered_by|   melt|molar_heat|      name|            named_by|number|period|phase|              source|        spectral_img|             summary|symbol|xpos|ypos|
+--------------------+-------------+-------+--------------------+-----+-------+--------------------+-------+----------+----------+--------------------+------+------+-----+--------------------+--------------------+--------------------+------+----+----+
|       colorless gas|        1.008| 20.271|   diatomic nonmetal| null|0.08988|     Henry Cavendish|  13.99|    28.836|  Hydrogen|   Antoine Lavoisier|     1|     1|  Gas|https://en.wikipe...|https://en.wikipe...|Hydrogen is a che...|     H|   1|   1|
|colorless gas, ex...|    4.0026022|  4.222|           noble gas| null| 0.1786|      Pierre Janssen|   0.95|      null|    Helium|                null|     2|     1|  Gas|https://en.wikipe...|https://en.wikipe...|Helium is a chemi...|    He|  18|   1|
|       silvery-white|         6.94| 1603.0|        alkali metal| null|  0.534|Johan August Arfw...| 453.65|     24.86|   Lithium|                null|     3|     2|Solid|https://en.wikipe...|                null|Lithium (from Gre...|    Li|   1|   2|
| white-gray metallic|   9.01218315| 2742.0|alkaline earth metal| null|   1.85|Louis Nicolas Vau...| 1560.0|    16.443| Beryllium|                null|     4|     2|Solid|https://en.wikipe...|                null|Beryllium is a ch...|    Be|   2|   2|
|         black-brown|        10.81| 4200.0|           metalloid| null|   2.08|Joseph Louis Gay-...| 2349.0|    11.087|     Boron|                null|     5|     2|Solid|https://en.wikipe...|                null|Boron is a metall...|     B|  13|   2|
|                null|       12.011|   null| polyatomic nonmetal| null|  1.821|       Ancient Egypt|   null|     8.517|    Carbon|                null|     6|     2|Solid|https://en.wikipe...|https://en.wikipe...|Carbon (from Lati...|     C|  14|   2|
|colorless gas, li...|       14.007| 77.355|   diatomic nonmetal| null|  1.251|   Daniel Rutherford|  63.15|      null|  Nitrogen|Jean-Antoine Chaptal|     7|     2|  Gas|https://en.wikipe...|https://en.wikipe...|Nitrogen is a che...|     N|  15|   2|
|                null|       15.999| 90.188|   diatomic nonmetal| null|  1.429|Carl Wilhelm Scheele|  54.36|      null|    Oxygen|   Antoine Lavoisier|     8|     2|  Gas|https://en.wikipe...|https://en.wikipe...|Oxygen is a chemi...|     O|  16|   2|
|                null|18.9984031636|  85.03|   diatomic nonmetal| null|  1.696|  André-Marie Ampère|  53.48|      null|  Fluorine|        Humphry Davy|     9|     2|  Gas|https://en.wikipe...|                null|Fluorine is a che...|     F|  17|   2|
|colorless gas exh...|     20.17976| 27.104|           noble gas| null| 0.9002|      Morris Travers|  24.56|      null|      Neon|                null|    10|     2|  Gas|https://en.wikipe...|https://en.wikipe...|Neon is a chemica...|    Ne|  18|   2|
|silvery white met...| 22.989769282|1156.09|        alkali metal| null|  0.968|        Humphry Davy|370.944|     28.23|    Sodium|                null|    11|     3|Solid|https://en.wikipe...|https://en.wikipe...|Sodium /ˈsoʊdiəm/...|    Na|   1|   3|
|    shiny grey solid|       24.305| 1363.0|alkaline earth metal| null|  1.738|        Joseph Black|  923.0|    24.869| Magnesium|                null|    12|     3|Solid|https://en.wikipe...|https://en.wikipe...|Magnesium is a ch...|    Mg|   2|   3|
|silvery gray meta...|  26.98153857| 2743.0|post-transition m...| null|    2.7|                null| 933.47|      24.2| Aluminium|        Humphry Davy|    13|     3|Solid|https://en.wikipe...|                null|Aluminium (or alu...|    Al|  13|   3|
|crystalline, refl...|       28.085| 3538.0|           metalloid| null|  2.329|Jöns Jacob Berzelius| 1687.0|    19.789|   Silicon|Thomas Thomson (c...|    14|     3|Solid|https://en.wikipe...|https://en.wikipe...|Silicon is a chem...|    Si|  14|   3|
|colourless, waxy ...|30.9737619985|   null| polyatomic nonmetal| null|   null|        Hennig Brand|   null|    23.824|Phosphorus|                null|    15|     3|Solid|https://en.wikipe...|                null|Phosphorus is a c...|     P|  15|   3|
|lemon yellow sint...|        32.06|  717.8| polyatomic nonmetal| null|   2.07|       Ancient china| 388.36|     22.75|    Sulfur|                null|    16|     3|Solid|https://en.wikipe...|https://en.wikipe...|Sulfur or sulphur...|     S|  16|   3|
|pale yellow-green...|        35.45| 239.11|   diatomic nonmetal| null|    3.2|Carl Wilhelm Scheele|  171.6|      null|  Chlorine|                null|    17|     3|  Gas|https://en.wikipe...|https://en.wikipe...|Chlorine is a che...|    Cl|  17|   3|
|colorless gas exh...|      39.9481| 87.302|           noble gas| null|  1.784|       Lord Rayleigh|  83.81|      null|     Argon|                null|    18|     3|  Gas|https://en.wikipe...|https://en.wikipe...|Argon is a chemic...|    Ar|  18|   3|
|        silvery gray|     39.09831| 1032.0|        alkali metal| null|  0.862|        Humphry Davy|  336.7|      29.6| Potassium|                null|    19|     4|Solid|https://en.wikipe...|https://en.wikipe...|Potassium is a ch...|     K|   1|   4|
|                null|      40.0784| 1757.0|alkaline earth metal| null|   1.55|        Humphry Davy| 1115.0|    25.929|   Calcium|                null|    20|     4|Solid|https://en.wikipe...|https://en.wikipe...|Calcium is a chem...|    Ca|   2|   4|
+--------------------+-------------+-------+--------------------+-----+-------+--------------------+-------+----------+----------+--------------------+------+------+-----+--------------------+--------------------+--------------------+------+----+----+
only showing top 20 rows


In [6]:
df.printSchema()


root
 |-- appearance: string (nullable = true)
 |-- atomic_mass: double (nullable = true)
 |-- boil: double (nullable = true)
 |-- category: string (nullable = true)
 |-- color: string (nullable = true)
 |-- density: double (nullable = true)
 |-- discovered_by: string (nullable = true)
 |-- melt: double (nullable = true)
 |-- molar_heat: double (nullable = true)
 |-- name: string (nullable = true)
 |-- named_by: string (nullable = true)
 |-- number: string (nullable = true)
 |-- period: long (nullable = true)
 |-- phase: string (nullable = true)
 |-- source: string (nullable = true)
 |-- spectral_img: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- xpos: long (nullable = true)
 |-- ypos: long (nullable = true)


In [7]:
df.select("name").show()


+----------+
|      name|
+----------+
|  Hydrogen|
|    Helium|
|   Lithium|
| Beryllium|
|     Boron|
|    Carbon|
|  Nitrogen|
|    Oxygen|
|  Fluorine|
|      Neon|
|    Sodium|
| Magnesium|
| Aluminium|
|   Silicon|
|Phosphorus|
|    Sulfur|
|  Chlorine|
|     Argon|
| Potassium|
|   Calcium|
+----------+
only showing top 20 rows

Seleccionamos los elementos químicos que tengan la masa atómica menor que 200 y mostramos los 10 primeros.


In [8]:
df.select(df['name'],df['atomic_mass']).filter(df['atomic_mass']<200).show(10)


+---------+-------------+
|     name|  atomic_mass|
+---------+-------------+
| Hydrogen|        1.008|
|   Helium|    4.0026022|
|  Lithium|         6.94|
|Beryllium|   9.01218315|
|    Boron|        10.81|
|   Carbon|       12.011|
| Nitrogen|       14.007|
|   Oxygen|       15.999|
| Fluorine|18.9984031636|
|     Neon|     20.17976|
+---------+-------------+
only showing top 10 rows


In [9]:
df.groupBy('phase').count().show()


+------+-----+
| phase|count|
+------+-----+
| Solid|  104|
|Liquid|    2|
|   Gas|   12|
+------+-----+

Ahora vamos a ver como a partir de un dataframe podemos generar una tabla temporal sobre la que ejecutaremos sentencias en SQL.


In [10]:
df.createGlobalTempView("chemistryTable")

In [11]:
spark.sql("select name from global_temp.chemistryTable").show(5)


+---------+
|     name|
+---------+
| Hydrogen|
|   Helium|
|  Lithium|
|Beryllium|
|    Boron|
+---------+
only showing top 5 rows

Como ya hemos comentado en el post, python no permite construir estructuras de dataset. Para que te hagas una idea si vienes del mundo Java o Scala. La creación de dataset se basa en la definición de una clase y permite añadir objetos de esa clase. El resultado es una estructura en formato de tabla como el dataframe mostrado en nuestro caso.

Infiriendo el esquema

En Spark SQL, existen dos formas de inferir el esquema un dataframe. Una es mediante reflexión y la otra es explicitamente con programación. A continuación vamos a ver ambos casos sobre un documento txt que contiene el elemento químico y su masa atómica.


In [12]:
from pyspark.sql import Row
sc = spark.sparkContext
lines=sc.textFile("sql/Periodictable.txt")
parts= lines.map(lambda p: p.split(","))

elements= parts.map(lambda e: Row(name=e[0],atomic_mass=float(e[1])))

In [13]:
schemeElements=spark.createDataFrame(elements)
schemeElements.createOrReplaceTempView("elements")

In [14]:
lightElements=spark.sql("select name from elements where atomic_mass>0 and atomic_mass<21")

In [15]:
lightElemName=lightElements.rdd.map(lambda elem: "Name: "+elem.name).collect()
for name in lightElemName:
    print(name)


Name: Hydrogen
Name: Helium
Name: Lithium
Name: Beryllium
Name: Boron
Name: Carbon
Name: Nitrogen
Name: Oxygen
Name: Fluorine
Name: Neon

Ahora vamos a ver como se haría programáticamente.


In [16]:
from pyspark.sql.types import *

sc=spark.sparkContext

lines=sc.textFile("sql/Periodictable.txt")
parts=lines.map(lambda line: line.split(","))
elements= parts.map(lambda p: (p[0],p[1]))

schemeString="name atomicMass"

fields= [StructField(field_name,StringType(),True) for field_name in schemeString.split()]
scheme =StructType(fields)

schemeElements= spark.createDataFrame(elements,scheme)

schemeElements.createOrReplaceTempView("elements")

spark.sql("select name,atomicMass from elements").show()

schemeElements.printSchema()


+---------+-------------+
|     name|   atomicMass|
+---------+-------------+
| Hydrogen|        1.008|
|   Helium|    4.0026022|
|  Lithium|         6.94|
|Beryllium|   9.01218315|
|    Boron|        10.81|
|   Carbon|       12.011|
| Nitrogen|       14.007|
|   Oxygen|       15.999|
| Fluorine|18.9984031636|
|     Neon|     20.17976|
+---------+-------------+

root
 |-- name: string (nullable = true)
 |-- atomicMass: string (nullable = true)

Data Source

Existen multitud de formatos disponible en Spark SQL (json,parquet,jdbc,orc,libsvm,csv,text,...) aunque el formato por defecto es parquet. En este apartado vamos a ver el manejo de diferentes formatos de datos y la comunicación con Hive, Parquet y JDBC para guardar/recuperar información.

Lo primero que vamos hacer es escribir la query anterior a un fichero parquet para despues hacer queries directamente sobre el fichero.


In [17]:
schemeElements.select("name","atomicMass").write.save("sql/namesAndAtomicMass.parquet")

Y ahra podemos hacer una query directamente desde el fichero.


In [8]:
newDf= spark.sql("select atomicMass from parquet.`sql/namesAndAtomicMass.parquet`")

In [9]:
newDf.show()


+-------------+
|   atomicMass|
+-------------+
|        1.008|
|    4.0026022|
|         6.94|
|   9.01218315|
|        10.81|
|       12.011|
|       14.007|
|       15.999|
|18.9984031636|
|     20.17976|
+-------------+

Cuando has arrancado el cluster de docker, en la base de datos hemos añadido una setencia books.sql alojada en notebook/initdb que carga dentro del esquema postgres la tabla libros con title author y año


In [ ]:
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://db:5432/postgres") \
    .option("dbtable", "books") \
    .option("user", "postgres") \
    .option("driver", "org.postgresql.Driver") \
    .option("password", "root") \
    .load()

In [3]:
jdbcDF.show();


+-------------+---------------+----+
|        title|         author|year|
+-------------+---------------+----+
|Cryptonomicon|Neal Stephenson|1998|
| The Cyberiad|  Stanislaw Lem|1985|
|       Friday|Robert Heinlein|1982|
|    The Big U|Neal Stephenson|1988|
+-------------+---------------+----+


In [4]:
jdbcDF.printSchema();


root
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- year: integer (nullable = true)